import 'dart:async';

import 'package:rxdart/rxdart.dart';

import '../../common/test_page.dart';
import '../utils.dart';

Stream<int> _getStream() => Stream<int>.fromIterable(const <int>[1, 2, 3, 4]);

extension on Duration {
  Stream<void> asTimerStream() => Rx.timer(null, this);
}

class DelayTestPage extends TestPage {
  DelayTestPage(super.title) {
    test('Rx.delay', () async {
      var value = 1;
      _getStream()
          .delay(const Duration(milliseconds: 200))
          .listen(((result) {
        expect(result);
      }));
    });

    test('Rx.delay.zero', () {
      expect(
        _getStream().delay(Duration.zero),
      );
    });

    test('Rx.delay.shouldBeDelayed', () async {
      _getStream()
          .delay(const Duration(milliseconds: 500))
          .timeInterval()
          .listen(((result) {
        expect(result.value);

        if (result.value == 1) {
          expect(result.interval.inMilliseconds);
        } else {
          expect(result.interval.inMilliseconds);
        }
      }));
    });

    test('Rx.delay.reusable', () async {
      final transformer =
      DelayStreamTransformer<int>(const Duration(milliseconds: 200));

      _getStream().transform(transformer).listen(((result) {
        expect(result);
      }));

      _getStream().transform(transformer).listen(((result) {
        expect(result);
      }));
    });

    test('Rx.delay.asBroadcastStream', () async {
      final stream = _getStream()
          .asBroadcastStream()
          .delay(const Duration(milliseconds: 200));


      stream.listen(null);
      stream.listen(null);

      expect(true);
    });

    test('Rx.delay.error.shouldThrowA', () async {
      final streamWithError = Stream<void>.error(Exception())
          .delay(const Duration(milliseconds: 200));

      streamWithError.listen(null,
          onError: (Object e, StackTrace s) {
            expect(e);
          });
    });

    test('Rx.delay.pause.resume', () async {
      late StreamSubscription<int> subscription;
      final stream =
      Stream.fromIterable(const [1, 2, 3]).delay(Duration(milliseconds: 1));

      subscription = stream.listen(((value) {
        expect(value);

        subscription.cancel();
      }));

      subscription.pause();
      subscription.resume();
    });

    test(
      'Rx.delay.cancel.emits.nothing',
          () async {
        late StreamSubscription<int> subscription;
        final stream = Stream.fromIterable(const [1, 2, 3]).doOnDone(() {
          subscription.cancel();
        }).delay(Duration(seconds: 10));

        subscription = stream.listen(((_) {}));
      }
    );

    test('Rx.delay accidental broadcast', () async {
      final controller = StreamController<int>();

      final stream = controller.stream.delay(Duration(seconds: 10));

      stream.listen(null);
      expect(() => stream.listen(null));

      controller.add(1);
    });

    test('Rx.delay.nullable', () {
      nullableTest<String?>(
            (s) => s.delay(Duration.zero),
      );
    });

    test('Rx.delayWhen', () {
      expect(
        _getStream().delayWhen((_) => Stream.value(null)),
      );

      expect(
        _getStream()
            .delayWhen((_) => const Duration(milliseconds: 200).asTimerStream()),
      );

      expect(
        _getStream()
            .delayWhen((i) => Duration(milliseconds: 100 * i).asTimerStream()),
      );

      expect(
        _getStream().delayWhen(
              (i) => Duration(milliseconds: 100 * i).asTimerStream(),
          listenDelay: Rx.timer(null, Duration(milliseconds: 100)),
        ),
      );
    });

    test('Rx.delayWhen.zero', () {
      expect(
        _getStream().delayWhen((_) => Duration.zero.asTimerStream()),
      );
    });

    test('Rx.delayWhen.shouldBeDelayed', () async {
      {
        await _getStream()
            .delayWhen((_) => const Duration(milliseconds: 500).asTimerStream())
            .timeInterval()
            .forEach(((result) {
          expect(result.value);

          if (result.value == 1) {
            expect(
              result.interval.inMilliseconds,
            ); // should be delayed
          } else {
            expect(
              result.interval.inMilliseconds,
            ); // should be near instantaneous
          }
        }));
      }

      {
        await _getStream()
            .delayWhen((i) => Duration(milliseconds: 500 * i).asTimerStream())
            .timeInterval()
            .forEach(((result) {
          expect(result.value);

          expect(
            (result.interval.inMilliseconds - 500).abs()
          ); // should be near instantaneous
        }));
      }
    });

    test('Rx.delayWhen.shouldBeDelayed.listenDelay', () {
      void onData(TimeInterval<int> result) {
        expect(result.value);

        if (result.value == 1) {
          expect(
            result.interval.inMilliseconds,
          ); // should be delayed
        } else {
          expect(
            (result.interval.inMilliseconds - 500).abs(),
          ); // should be near instantaneous
        }
      }

      _getStream()
          .delayWhen(
            (i) => Duration(milliseconds: 500 * i).asTimerStream(),
        listenDelay: Rx.timer(null, const Duration(milliseconds: 300)),
      )
          .timeInterval()
          .listen((onData));
    });

    test('Rx.delayWhen.reusable', () {
      final transformer = DelayWhenStreamTransformer<int>(
              (_) => const Duration(milliseconds: 200).asTimerStream());

      expect(
        _getStream().transform(transformer)
      );

      expect(
        _getStream().transform(transformer)
      );
    });

    test('Rx.delayWhen.asBroadcastStream', () {
      {
        final stream = _getStream()
            .asBroadcastStream()
            .delayWhen((_) => const Duration(milliseconds: 200).asTimerStream());

        // listen twice on same stream
        stream.listen(null);
        stream.listen(null);

        // code should reach here
        expect(true);
      }

      {
        final stream = _getStream()
            .delayWhen((_) => const Duration(milliseconds: 200).asTimerStream())
            .asBroadcastStream();

        // listen twice on same stream
        stream.listen(null);
        stream.listen(null);

        // code should reach here
        expect(true);
      }

      {
        final stream = _getStream()
            .delayWhen(
              (_) => const Duration(milliseconds: 200).asTimerStream(),
          listenDelay: Stream.value(null),
        )
            .asBroadcastStream();

        // listen twice on same stream
        stream.listen(null);
        stream.listen(null);

        // code should reach here
        expect(true);
      }
    });

    test('Rx.delayWhen.error.shouldThrowA', () {
      expect(
        Stream<void>.error(Exception())
            .delayWhen((_) => const Duration(milliseconds: 200).asTimerStream()),
      );
    });

    test('Rx.delayWhen.error.shouldThrowB', () {
      expect(
        Stream.value(0).delayWhen(
              (_) => const Duration(milliseconds: 200).asTimerStream(),
          listenDelay: Stream.error(Exception('listenDelay')),
        ),
      );
    });

    test('Rx.delayWhen.pause.resume', () async {
      late StreamSubscription<int> subscription;
      final stream = Stream.fromIterable(const [1, 2, 3])
          .delayWhen((_) => Duration(milliseconds: 1).asTimerStream());

      subscription = stream.listen(((value) {
        expect(value);

        subscription.cancel();
      }));

      subscription.pause();
      subscription.resume();
    });

    test('Rx.delayWhen.pause.resume.listenDelay', () {
      late StreamSubscription<int> subscription;
      final stream = Stream.fromIterable(const [1, 2, 3]).delayWhen(
            (_) => Duration(milliseconds: 1).asTimerStream(),
        listenDelay: Rx.timer(null, const Duration(milliseconds: 200)),
      );

      subscription = stream.listen(((value) {
        expect(value);

        subscription.cancel();
      }));

      subscription.pause();
      subscription.resume();
    });

    test(
      'Rx.delayWhen.cancel.emits.nothing',
          () {
        late StreamSubscription<int> subscription;
        final stream = _getStream()
            .doOnDone(() => subscription.cancel())
            .delayWhen((_) => Duration(seconds: 10).asTimerStream());

        // We expect the onData callback to be called 0 times because the
        // subscription is cancelled when the base stream ends.
        subscription = stream.listen(((_) {}));
      }
    );

    test(
      'Rx.delayWhen.cancel.emits.nothing.listenDelay',
          () {
        late StreamSubscription<int> subscription;
        final stream =
        _getStream().doOnDone(() => subscription.cancel()).delayWhen(
              (_) => Duration(seconds: 10).asTimerStream(),
          listenDelay: Stream.periodic(const Duration(seconds: 1)),
        );

        // We expect the onData callback to be called 0 times because the
        // subscription is cancelled when the base stream ends.
        subscription = stream.listen(((_) {}));
      }
    );

    test('Rx.delayWhen.singleSubscription', () async {
      final controller = StreamController<int>();

      final stream = controller.stream
          .delayWhen((_) => Duration(seconds: 10).asTimerStream());

      stream.listen(null);
      expect(() => stream.listen(null));

      controller.add(1);
    });

    test('Rx.delayWhen.nullable', () {
      nullableTest<String?>(
            (s) => s.delayWhen((_) => Duration.zero.asTimerStream()),
      );
    });

  }

}